package org.wsff.tools.state.event;

import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * AbstractAsyncEventMulticaster
 *
 * @author ryan
 * @version Id: AbstractAsyncEventMulticaster.java, v 0.1 2022-03-28 09:52 ryan Exp $$
 */
@Slf4j
public abstract class AbstractAsyncEventMulticaster extends AbstractEventMulticaster {

    /**
     * multicast event
     *  @param event event
     * @param sync sync or async
     */
    @Override
    public void multicastEvent(BaseEvent event, boolean sync) {
        // get all support listeners
        List<BaseEventListener<BaseEvent>> allSupportListeners = getAllSupportListeners(event);

        // get task executor
        Executor taskExecutor = getTaskExecutor(event);

        // on event
        onEvent(event, sync, allSupportListeners, taskExecutor);
    }

    /**
     * getTaskExecutor
     *
     * @param event event
     * @return Executor
     */
    protected abstract Executor getTaskExecutor(BaseEvent event);

    /**
     * handle event
     * 
     * @param event event
     * @param sync sync
     * @param listeners listeners
     * @param executor executor
     */
    protected void onEvent(BaseEvent event, boolean sync, List<BaseEventListener<BaseEvent>> listeners, Executor executor) {
        for (BaseEventListener<BaseEvent> listener : listeners) {
            try {
                if (sync) {
                    listener.onEvent(event);
                } else {
                    try {
                        executor.execute(new AsyncEventHandler(listener, event));
                    } catch (Exception e) {
                        log.error("[Process a event, failure] event: " + event.getClass().getName() + ":" + event.getId(), e);
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /**
     * AsyncExecutorHolder
     */
    protected static class AsyncExecutorHolder {
        private static Executor defaultExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("simple-event", true));

        public static Executor getDefaultExecutor() {
            return defaultExecutor;
        }
    }

    /**
     * async event handler
     */
    protected static class AsyncEventHandler implements Runnable {

        private final BaseEventListener<BaseEvent> listener;
        private final BaseEvent                    event;

        public AsyncEventHandler(BaseEventListener<BaseEvent> listener, BaseEvent event) {
            this.listener = listener;
            this.event = event;
        }

        /**
         * When an object implementing interface <code>Runnable</code> is used
         * to create a thread, starting the thread causes the object's
         * <code>run</code> method to be called in that separately executing
         * thread.
         * <p>
         * The general contract of the method <code>run</code> is that it may
         * take any action whatsoever.
         *
         * @see     Thread#run()
         */
        @Override
        public void run() {
            try {
                listener.onEvent(event);
            } catch (Exception e) {
                log.error("[Process a event, failure] event: " + event.getClass().getName() + ":" + event.getId(), e);
            }
        }
    }

    /**
     * simple named thread factory
     */
    protected static class NamedThreadFactory implements ThreadFactory {

        private final AtomicInteger threadNum = new AtomicInteger(1);

        /** */
        private final String        prefix;

        /** */
        private final boolean       daemon;

        /**  */
        private final ThreadGroup   tGroup;

        public NamedThreadFactory(String prefix, boolean daemon) {
            this.prefix = prefix + "-thread-";
            this.daemon = daemon;
            SecurityManager s = System.getSecurityManager();
            this.tGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
        }

        /**
         * Constructs a new {@code Thread}.  Implementations may also initialize
         * priority, name, daemon status, {@code ThreadGroup}, etc.
         *
         * @param r a runnable to be executed by new thread instance
         * @return constructed thread, or {@code null} if the request to
         *         create a thread is rejected
         */
        @Override
        public Thread newThread(Runnable r) {
            String name = prefix + threadNum.getAndIncrement();
            Thread ret = new Thread(tGroup, r, name, 0);
            ret.setDaemon(daemon);
            return ret;
        }
    }
}
